Inceptor StreamSQL简介
一些流处理平台(比如Spark Streaming,Storm)通常要求用户在创建流处理应用时必须用Java或者Scala进行开发,包括早期的TDH(4.3之前)也是如此。这要求业务人员只有对框架以及流处理本身,甚至是底层技术足够熟悉,才能写出高效的流处理程序。大大地限制了流处理的推广和应用,架高了流处理应用的开发门槛,对于数据科学家和数据分析者而言增加了操作难度,导致无法将精力完全投入在业务分析上。
Inceptor StreamSQL简介
一些流处理平台(比如Spark Streaming,Storm)通常要求用户在创建流处理应用时必须用Java或者Scala进行开发,包括早期的TDH(4.3之前)也是如此。这要求业务人员只有对框架以及流处理本身,甚至是底层技术足够熟悉,才能写出高效的流处理程序。大大地限制了流处理的推广和应用,架高了流处理应用的开发门槛,对于数据科学家和数据分析者而言增加了操作难度,导致无法将精力完全投入在业务分析上。
为降低流应用开发的入门要求,星环从TDH 4.3开始,在Transwarp Stream中引入全新的StreamSQL,允许用户通过SQL实现业务逻辑。StreamSQL几乎可以应对所有类型的业务场景,包括ETL工具,规则报警工具等简单业务场景。为了实现更复杂的业务逻辑,它还对PL/SQL作为高级功能提供了优美的支持。从4.8开始,StreamSQL又新增了基于事件驱动模式的流处理功能,在低延迟处理方面的性能表现更为出色。
关于StreamSQL的几个重要概念
StreamSQL有三个核心概念:Stream、StreamJob 和 Application。概括地说,Stream是数据流,StreamJob是对一个或多个Stream进行计算并将结果写进一张表的任务,Application是一个或多个StreamJob的集合。
Stream
Stream分为两种:Input Stream和Derived Stream。直接用于接收数据源传来的数据称为Input Stream;对已有Stream进行变形得到的新的Stream称为Derived Stream。
StreamJob
StreamSQL中的Stream是静态的——它们仅仅描述了如何对数据源传来的数据进行接收和变形的计划,但并不执行这些计划。要让StreamSQL执行计划,需要有相应的Action操作来触发StreamJob。启动一个StreamJob时,StreamSQL会为每一个Input Stream启动一组称为 receiver的任务来接收数据,接收的数据经过一系列Derived Stream的变形最终被插入一张表,供用户查询。
Application
Application是一组业务逻辑相关的StreamJob的集合。合理地使用Application划分StreamJob可以实现资源的共享和隔离。之后我们会有文章对Application的隔离能力进行专门介绍。
StreamSQL的简单示例
快速入门
这里将通过一个简单的例子,对StreamSQL的使用方法做基本介绍。我们会使用Kafka的console工具生成一些简单的数据,并让StreamSQL来处理。该演示包含三部分:
1. 建一个Kafka数据源;
2. 在Inceptor中建一个Stream并触发StreamJob;
3. 在Inceptor中处理Stream从Kafka数据源接收的数据。
建Kafka数据源
1. 登录Kafka节点
登陆集群中任意安装了Kafka的节点。进入/usr/lib/kafka/bin目录,该目录下有建Kafka数据源的所需要的脚本。
2. 建一个Kafka Topic
执行下面指令,运行/usr/lib/kafka/bin目录下的kafka-create-topic.sh脚本:
该指令提供了如下信息:topic名称为demo,使用172.16.1.128上的Zookeeper,分3个partition。
3. 查看Kafka Topic
执行下面指令,运行/usr/lib/kafka/bin目录下的kafka-topics.sh脚本:
我们可以看到刚才建的名为demo的topic,和它的Partition信息。
4. 建Kafka Producer并发布消息
执行下面指令,运行/usr/lib/kafka/bin目录下的kafka-console-producer.sh脚本:
该指令的含义为:指定使用172.16.1.128节点为Kafka Broker,并且指定Producer发布消息的topic为demo。现在,我们可以在命令行中输入一些消息,这些消息都将被发布给demo:
5. 建好数据源
至此,已经建好了一个Kafka数据源,并发布了一些消息。先不要停止上面Producer的进程,让它保持运行,你可以继续在命令行中输入消息。现在打开另一个窗口登陆集群,进入Inceptor,建一个Stream并触发StreamJob的开始。
建Stream及触发StreamJob
1. 登录Inceptor
登陆集群中的任意一个节点,连接到Inceptor。这里,我们以hive用户身份连接一个LDAP认证的Inceptor Server 2。
此处的port由Transwarp Manager配置页上的参数 hive.server2.thrift.port 配置,默认为10010。
2. 建一个Stream
该StreamSQL语句建了一个名为demo_stream的Stream,它使用Kafka为源,接收发布给名为demo的topic的消息,将接收的消息按“,”分隔为两列:id(类型为INT)和letter(类型为STRING)。
3. 查看Stream
通过SHOW STREAMS查看刚才创建的Stream。
我们可以看到结果中出现了刚刚建好的demo_stream。
4. 创建并触发一个StreamJob
a. 建一张新表demo_table,它需要和demo_stream有相同的schema:
b. 向demo_table插入demo_stream中的数据,这个操作会触发StreamJob的执行:
5. 列出正在运行的StreamJob
执行下面指令:
我们可以看到如下输出:
输出结果包含streamid,触发StreamJob的sql和状态。
6. 在Inceptor管理界面查看StreamJob运行状态
打开浏览器,访问http://<inceptor_server_ip>:<port>,可以在Inceptor的管理界面看到当前正在运行的StreamJob,其中此处的port由Transwarp Manager配置界面上的 inceptor.ui.port 参数决定,默认为4044:
7. 接收消息
此时demo_stream已经开始接收发布到之前创建的demo的消息。需要注意的是,demo_stream对发布到demo的消息的接收是从streamid=29008ed34b9e45bca784362948b88a85的StreamJob触发开始的,也就可以看到在“Active Stages”下有正在运行的demo_stream。
是说从执行INSERT开始,因此在执行INSERT之前发布到demo的消息都不会被demo_stream接收。所以,目前demo_table中没有任何记录:
接收并处理Kafka传来的数据
1. 切换到在运行的Kafka Producer的交互界面:
由于“hello”,“world”都是在streamid=29008ed34b9e45bca784362948b88a85的StreamJob触发前发布的,这两条消息都不会被demo_stream接收。
2. 在命令行中输入一些数据
由于已经规定了demo_stream接收消息的类型是由“,”分隔的两列文本,我们需要输入这个类型的消息,以便demo_stream处理。输入:
3. 查看数据
切换到Inceptor命令行的窗口,查看demo_table中的数据,我们可以看到demo_table中出现了我们刚才发布的四条消息:
4. 现在可以在demo_table上进行一些查询:
停止Streamjob
演示到此结束,用下面指令可以停止streamid为29008ed34b9e45bca784362948b88a85的streamjob:
StreamSQL的优势
相对于采用编程(Scala/Java)的方式开发流应用,采用StreamSQL具有以下优势:
微批模式和事件驱动模式的一体化
在同一套系统里,用户可以根据业务需求,灵活切换微批模式的流处理和事件驱动模式的流处理。
极高的易用性
而使用StreamSQL,用户只需要有编写普通SQL的经验,就可以写出高效、安全、稳定的流处理应用。
性能提升
在一些条件下,采用StreamSQL的方式甚至比编程方式获得更高的性能提升。这是因为StreamSQL做了一些特殊优化,在编程模式下无法轻易实现。
产品化程度高
通过编程的方式来实现流处理的另一个问题是产品化程度非常低。由于编程有较高的自由度,出现问题的可能性很大;而又由于编程的方式将流处理平台和用户程序绑定在一起,用户没办法及时分析出出错问题的root cause。SQL作为一个通用的接口将大大地提高产品化程度。
迁移成本低
用户原有的很多业务逻辑是通过SQL实现,如果通过编程的方式迁移到流上,迁移成本非常高,难以保证迁移后逻辑的正确性。而一旦采用StreamSQL,用户只需要修改少量SQL,迁移成本几乎接近零。
由于具备上述几项优点,Inceptor StreamSQL将会在建立流式应用时,表现出其强大的业务应对能力和易用性。用户将发现流式分析的实现过程也可以很便捷。
往期原创文章
由星环大数据产品剖析基于SQL on Hadoop的数据仓库技术
大数据开放实验室由星环信息科技(上海)有限公司运营,专门致力于大数据技术的研究和传播。若转载请在文章开头明显注明“文章来源于微信订阅号——大数据开放实验室”,并保留作者和账号介绍。